package com.xueyuan.wata.daph.flink117.utils

import com.xueyuan.wata.daph.flink117.enums.CatalogDatabaseType
import com.xueyuan.wata.daph.flink117.enums.CatalogDatabaseType._
import com.xueyuan.wata.daph.utils.JsonUtil
import org.apache.flink.configuration.Configuration
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.catalog.Catalog
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.hudi.table.catalog.HoodieCatalog
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.flink.{CatalogLoader, FlinkCatalog}
import org.apache.logging.log4j.scala.Logging

import scala.jdk.CollectionConverters.mapAsJavaMapConverter

object CatalogUtil extends Logging {
  def createCatalog(dbType: CatalogDatabaseType.Value, config: Map[String, String]): Catalog = {
    dbType match {
      case HIVE =>
        val catalogName = config("name")
        val defaultDatabase = config("default-database")
        val hiveConfDir = config("hive-conf-dir")

        new HiveCatalog(catalogName, defaultDatabase, hiveConfDir)
      case JDBC =>
        val catalogName = config("name")
        val defaultDatabase = config("default-database")
        val username = config("username")
        val password = config("password")
        val url = config("base-url")

        new JdbcCatalog(catalogName, defaultDatabase, username, password, url)
      case HUDI =>
        val catalogName = config("name")
        val hudiConfig = config.filterNot(_._1.equals("name"))
        val conf = Configuration.fromMap(hudiConfig.asJava)

        new HoodieCatalog(catalogName, conf)
      case ICEBERG =>
        val catalogName = config("name")
        val defaultDatabase = config("default-database")

        val levels = config("levels").split(",")
        val namespace = Namespace.of(levels: _*)

        val hadoopConfM = JsonUtil.defaultMapper.readValue(config("hadoop-conf"), classOf[Map[String,String]])
        val hadoopConf = new org.apache.hadoop.conf.Configuration
        hadoopConfM.foreach {case (k,v)=>hadoopConf.set(k,v)}
        val props = JsonUtil.defaultMapper.readValue(config("props"), classOf[Map[String,String]])
        val catalogLoader =  config("loader-type") match {
          case "hive" => CatalogLoader.hive(catalogName, hadoopConf, props.asJava)
          case "hadoop" => CatalogLoader.hadoop(catalogName, hadoopConf, props.asJava)
          case "rest" => CatalogLoader.rest(catalogName, hadoopConf, props.asJava)
          case "custom" => CatalogLoader.custom(catalogName, props.asJava, hadoopConf, config("impl"))
        }

        val cacheEnabled = config("cacheEnabled").toBoolean
        val cacheExpirationIntervalMs = config("cacheExpirationIntervalMs").toLong

        new FlinkCatalog(catalogName, defaultDatabase, namespace ,catalogLoader, cacheEnabled, cacheExpirationIntervalMs)
    }
  }

  def registerCatalog(tableEnv: TableEnvironment, catalogName: String, catalog: Catalog): Unit = {
    if (!tableEnv.listCatalogs().contains(catalogName))
      tableEnv.registerCatalog(catalogName, catalog)
  }


  /**
   * 打印Catalog详细信息
   *
   * @param tableEnv    Table环境对象
   * @param catalogName Catalog名称
   */
  def logAndProduceCatalogDetails(tableEnv: TableEnvironment, catalogName: String): String = {
    tableEnv.useCatalog(catalogName)

    val cs = tableEnv.listCatalogs()
    val dbs = tableEnv.listDatabases()
    val cc = tableEnv.getCurrentCatalog
    val cd = tableEnv.getCurrentDatabase
    val ts = tableEnv.listTables()
    val tts = tableEnv.listTemporaryTables()
    val vs = tableEnv.listViews()
    val tvs = tableEnv.listTemporaryViews()

    val res =
      s"""
         |Catalogs: ${cs.mkString("Array(", ", ", ")")}
         |Databases: ${dbs.mkString("Array(", ", ", ")")}
         |CurrentCatalog: $cc
         |CurrentDatabase: $cd
         |Tables: ${ts.mkString("Array(", ", ", ")")}
         |TemporaryTables: ${tts.mkString("Array(", ", ", ")")}
         |Views: ${vs.mkString("Array(", ", ", ")")}
         |TemporaryViews: ${tvs.mkString("Array(", ", ", ")")}
         |""".stripMargin
    logger.info(res)

    res
  }

}
